//
// 用户客Websocket客户端连接接待管理
// 接待管理主要任务
// 	  1、用户客户端登录，登出
//    2、同一种类型的设备(phone/pad/web/pc)只允许一个客户端登录
// 	  3、和家庭中心通信，消息转发，已经接收家庭中心的推送消息
//

package member

import (
	"adai.design/jarvis/common/log"
	"errors"
	"github.com/gorilla/websocket"
	"net/http"
)

type PkgObserver interface {
	MemberPkgHandler(pkg *MessagePkg)
}

type receptions struct {
	// 用户连接客户端 member_id -> clients
	members map[string][]*reception

	// 客户端在线和离线的消息
	online  chan *reception
	offline chan *reception

	// 接收待处理和需要发送的数据包
	rpkg chan *MessagePkg
	spkg chan *MessagePkg

	observers []PkgObserver

	// websocket 服务信息
	upgrader websocket.Upgrader
	address  string
	end      chan bool
}

func (rs *receptions) ServeHTTP(response http.ResponseWriter, request *http.Request) {
	c, err := rs.upgrader.Upgrade(response, request, nil)
	if err != nil {
		log.Error("upgrader: %s", err)
		return
	}
	r := reception{conn: c}
	go r.start(workers)
}

func (rs *receptions) start() {
	for {
		select {

		// 客户端登录在线事件
		case r, ok := <-rs.online:
			if ok {
				log.Warn("account(%s).%s  online", r.account.Account(), r.client.DevType)
				if clients, ok := rs.members[r.account.Id]; ok {
					for i, v := range clients {
						if v.client.DevType == r.client.DevType {
							clients[i].logout("other-device-login")
							rs.members[r.account.Id] = append(clients[:i], clients[i+1:]...)
						}
					}
					rs.members[r.account.Id] = append(rs.members[r.account.Id], r)
				} else {
					rs.members[r.account.Id] = []*reception{r}
				}
			}

		// 客户端离线事件
		case r, ok := <-rs.offline:
			if ok {
				if cs, ok := rs.members[r.account.Id]; ok {
					for i, v := range cs {
						if v == r {
							log.Warn("account(%s).%s offline", r.account.Account(), r.client.DevType)
							rs.members[r.account.Id] = append(cs[:i], cs[i+1:]...)
						}
					}
				}
			}

		// 向在线的用户客户端传输消息
		case pkg, ok := <-rs.spkg:
			if ok {
				if rs, ok := rs.members[pkg.Ctx.MemberId]; ok {
					// 发送给一个用户的多有登录客户端
					if pkg.Ctx.Session == "" {
						for _, r := range rs {
							r.task <- pkg.Msg
						}
					} else {
						// 发送给制定用户的制定客户端
						for _, r := range rs {
							if r.client.Session == pkg.Ctx.Session {
								r.task <- pkg.Msg
								break
							}
						}
					}
				}
			}

		// 向每个用户中心的消息监听者发送消息
		case pkg, ok := <-rs.rpkg:
			if ok {
				for _, observer := range rs.observers {
					observer.MemberPkgHandler(pkg)
				}
			}
		}
	}
}

var workers = func() *receptions {
	rs := &receptions{
		online:  make(chan *reception, 10),
		offline: make(chan *reception, 10),
		rpkg:    make(chan *MessagePkg, 10),
		spkg:    make(chan *MessagePkg, 10),
		members: make(map[string][]*reception, 0),
	}
	go rs.start()
	return rs
}()

func SendPackage(pkg *MessagePkg) error {
	if pkg == nil || pkg.Ctx == nil || pkg.Msg == nil {
		return errors.New("message-package-format-error")
	}
	workers.spkg <- pkg
	return nil
}

func AddObserver(ob PkgObserver) error {
	workers.observers = append(workers.observers, ob)
	return nil
}

func RemoveObserver(ob PkgObserver) error {
	for i, observer := range workers.observers {
		if observer == ob {
			workers.observers = append(workers.observers[:i], workers.observers[i+1:]...)
			return nil
		}
	}
	return nil
}
